---
title: Guide - Apache DataFu Spark
version: 1.6.1
section_name: Apache DataFu Spark
license: >
   Licensed to the Apache Software Foundation (ASF) under one or more
   contributor license agreements.  See the NOTICE file distributed with
   this work for additional information regarding copyright ownership.
   The ASF licenses this file to You under the Apache License, Version 2.0
   (the "License"); you may not use this file except in compliance with
   the License.  You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   See the License for the specific language governing permissions and
   limitations under the License.
---

# Guide

Apache DataFu Spark is a collection of utils and user-defined functions for working with large scale data in [Apache Spark](https://spark.apache.org/).
It has a number of useful functions available.  This guide will provide examples of how to use these functions and serves as an overview for working with the library.

## Spark Compatibility

The current version of DataFu has been tested against Spark versions 2.1.x - 2.4.x, in Scala 2.10, 2.11 and 2.12 (where applicable).  The JAR for Scala 2.11 has been published to the [Apache Maven Repository](https://repository.apache.org/content/groups/public/org/apache/datafu/).  Other versions can be built by [downloading the source](/docs/download.html) and following the build instructions.

## Calling DataFu Spark functions from PySpark

In order to call the datafu-spark API's from Pyspark, you can do the following (tested on a Hortonworks vm)

First, call pyspark with the following parameters

```bash
export PYTHONPATH=datafu-spark_2.11-<%= current_page.data.version %>.jar

pyspark --jars datafu-spark_2.11-<%= current_page.data.version %>-SNAPSHOT.jar --conf spark.executorEnv.PYTHONPATH=datafu-spark_2.11-<%= current_page.data.version %>-SNAPSHOT.jar
```

The following is an example of calling the Spark version of the datafu _dedup_ method

```python
from pyspark_utils.df_utils import PySparkDFUtils

df_utils = PySparkDFUtils()

df_people = sqlContext.createDataFrame([
     ("a", "Alice", 34),
     ("a", "Sara", 33),
     ("b", "Bob", 36),
     ("b", "Charlie", 30),
     ("c", "David", 29),
     ("c", "Esther", 32),
     ("c", "Fanny", 36),
     ("c", "Zoey", 36)],
     ["id", "name", "age"])

func_dedup_res = df_utils.dedup_with_order(dataFrame=df_people, groupCol=df_people.id,
                                orderCols=[df_people.age.desc(), df_people.name.desc()])

func_dedup_res.registerTempTable("dedup")

func_dedup_res.show()
```

This should produce the following output

<pre>
+---+-----+---+
| id| name|age|
+---+-----+---+
|  c| Zoey| 36|
|  b|  Bob| 36|
|  a|Alice| 34|
+---+-----+---+
</pre>

## Using DataFu to do Skewed Joins

DataFu-Spark contains two methods for doing skewed joins.

_broadcastJoinSkewed_ can be used in cases when one data frame is skewed and the other is not skewed. It splits both of the data frames to two parts according to the skewed keys.
For example, let's say we have two data frames, _customers_, which isn't skewed:

<pre>
+-------+-----------+
|company|year_joined|
+-------+-----------+
| paypal|       2017|
| myshop|       2019|
+-------+-----------+
</pre>

And _transactions_, which is skewed on the field _company_:

<pre>
+--------------+-------+
|transaction_id|company|
+--------------+-------+
|             1| paypal|
|             2| paypal|
|             3| paypal|
|             4| paypal|
|             5| paypal|
|             6| paypal|
|             7| paypal|
|             8| paypal|
|             9| myshop|
+--------------+-------+
</pre>

In order to join them, we need to determine how many rows we would like to broadcast. In our case, with only one skewed key, we would use 1, like this:

```scala
val result = customers.broadcastJoinSkewed(transactions, Seq("company", 1))
```

The result will look like this, just as if we had used a regular join.

<pre>
+-------+-----------+--------------+
|company|year_joined|transaction_id|
+-------+-----------+--------------+
| myshop|       2019|             9|
| paypal|       2017|             1|
| paypal|       2017|             2|
| paypal|       2017|             3|
| paypal|       2017|             4|
| paypal|       2017|             5|
| paypal|       2017|             6|
| paypal|       2017|             7|
| paypal|       2017|             8|
+-------+-----------+--------------+
</pre>

## Doing a join between a number and a range

An interesting type of join that DataFu allows you to do is between a point and a range. A naive solution for this might explode the range columns, but this would cause the table to become huge.
The DataFu _joinWithRange_ method takes a decrease factor in order to deal with this problem.

For an example, let's imagine two data frames, one of graded papers and one representing a system for scoring.

The dataframe for grades might look like this:

<pre>
+-----+-------+
|grade|student|
+-----+-------+
|   37| tyrion|
|   72|   robb|
|   83| renley|
|   64|    ned|
|   95|  sansa|
|   88|   arya|
|   79| cersei|
|   81|  jaime|
+-----+-------+
</pre>

The scoring system might look like this:

<pre>
+-----+---+---+
|grade|min|max|
+-----+---+---+
|    A| 90|100|
|    B| 80| 90|
|    C| 70| 80|
|    D| 60| 70|
|    F|  0| 60|
+-----+---+---+
</pre>

We will use a decrease factor of 10, since each range is of size at least 10.

```scala
skewed.joinWithRange("grade", notskewed, "min", "max", 2).show
```

Our result will be as follows:

<pre>
+-----+-------+-----+---+---+
|grade|student|grade|min|max|
+-----+-------+-----+---+---+
|   37| tyrion|    F|  0| 60|
|   72|   robb|    C| 70| 80|
|   83| renley|    B| 80| 90|
|   64|    ned|    D| 60| 70|
|   95|  sansa|    A| 90|100|
|   88|   arya|    B| 80| 90|
|   79| cersei|    C| 70| 80|
|   81|  jaime|    B| 80| 90|
+-----+-------+-----+---+---+
</pre>

In order to use _joinWithRange_ on tables, they need to meet two requirements:

1. the points table (_grades_ in our example) needs to be distinct on the point column
2. the range and point columns need to be numeric

If there are ranges that overlap, a point that matches will be joined to all the ranges that include it. In order to take only one range per point, you can use the _joinWithRangeAndDedup_ method.

It takes the same parameters as _joinWithRange_, with one addition - whether to match the largest or smallest range that contains a point.


